package com.yuepeng.camel;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.component.activemq.ActiveMQComponent;
import org.apache.camel.component.jms.JmsComponent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.jms.connection.JmsTransactionManager;

@Configuration
@PropertySource(value = "classpath:my-application.properties")
public class JmsConfig {

    @Value("${custom.broker.url}")
    private String borkerUrl;

    // 设置JMS连接工厂
    // 为您的使用者集合有效地合并连接和会话
    // 参见 https://camel.apache.org/components/latest/eips/transactional-client.html#TransactionalClient-JMSSample
    @Bean(destroyMethod = "stop", initMethod = "start")
    PooledConnectionFactory poolConnectionFactory(final ActiveMQConnectionFactory jmsConnectionFactory) {
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(jmsConnectionFactory);
        // 请参阅 http://127.0.0.1:8161/admin/connections.jsp
        // 默认值为1，但5为5，弹簧属性配置为
        pooledConnectionFactory.setMaxConnections(8);
        return pooledConnectionFactory;
    }

    @Bean
    ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(borkerUrl);
        return activeMQConnectionFactory;
    }

    // 设置Spring JMS TX Manager
    // Camel 建议使用Spring事务从EIP模式支持事务性客户端
    @Bean
    public JmsTransactionManager jmsTransactionManager(final PooledConnectionFactory poolConnectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(poolConnectionFactory);
        return jmsTransactionManager;
    }

    // 定义我们的activemq组件
    @Bean(name = "activemq")
    public JmsComponent jmsComponent(final PooledConnectionFactory poolConnectionFactory,
                                     final JmsTransactionManager jmsTransactionManager) {
        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
        activeMQComponent.setConnectionFactory(poolConnectionFactory);
        activeMQComponent.setTransactionManager(jmsTransactionManager);
        activeMQComponent.setUsePooledConnection(true);
        activeMQComponent.setTransacted(true);
        activeMQComponent.setLazyCreateTransactionManager(false);
        activeMQComponent.setCacheLevelName("CACHE_CONSUMER");
        activeMQComponent.setAcknowledgementModeName("SESSION_TRANSACTED");
        // 可能正在使用setConcurrentConsumers进行调整
        // 参见并发消费者 https://camel.apache.org/components/latest/activemq-component.html（默认为1）
        // 请参阅 http://127.0.0.1:8161/admin/queues.jsp中的“消费者数量
        activeMQComponent.setConcurrentConsumers(4);
        return activeMQComponent;
    }

}
